/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/*******************************************************************************
 * $Id: we_dctnry.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
 *
 *******************************************************************************/
/** @we_dctnry.cpp
 *  When a signature is given, the value will be stored in dictionary and
 *  a token will be issued. Given a token, the signature in the dictionary
 *  can be deleted.
 *  The whole file contains only one class Dctnry
 */
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <vector>
#include <set>
#include <sstream>
#include <inttypes.h>
#include <iostream>
using namespace std;

#include "bytestream.h"
#include "brmtypes.h"
#include "extentmap.h"  // for DICT_COL_WIDTH
#include "we_stats.h"
#include "we_log.h"
#include "we_dctnry.h"
using namespace messageqcpp;
using namespace WriteEngine;
using namespace BRM;
#include "IDBDataFile.h"
#include "IDBPolicy.h"
#include "cacheutils.h"

#include <arrow/api.h>

using namespace idbdatafile;
#include "checks.h"
#include "utils_utf8.h"  // for utf8_truncate_point()

namespace
{
using BinaryArraySharedPtr = std::shared_ptr<arrow::BinaryArray>;
using FixedSizeBinaryArraySharedPtr = std::shared_ptr<arrow::FixedSizeBinaryArray>;

// These used to be member variables, hence the "m_" prefix.  But they are
// all constants, so I removed them as member variables.  May change the
// variable name later (to remove the m_ prefix) as time allows.
const uint16_t m_endHeader = DCTNRY_END_HEADER;  // end of header flag (0xffff)
const uint16_t m_offSetZero = BYTE_PER_BLOCK;    // value for 0 offset (8192)
const int m_totalHdrBytes =                      // # bytes in header
    HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE + HDR_UNIT_SIZE;
const int START_HDR1 =  // start loc of 2nd offset (HDR1)
    HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE;
const int PSEUDO_COL_WIDTH = DICT_COL_WIDTH;  // used to convert row count to block count
const int MAX_BLOB_SIZE = 16777215;           // MCOL-4758 limit TEXT and BLOB to 16MB
}  // namespace

namespace WriteEngine
{
// We will make this a constant for now.  If we ever decide to make
// INITIAL_EXTENT_ROWS_TO_DISK configurable, we will need to move this
// statement, and use Config class to get INITIAL_EXTENT_ROWS_TO_DISK.
int NUM_BLOCKS_PER_INITIAL_EXTENT = ((INITIAL_EXTENT_ROWS_TO_DISK / BYTE_PER_BLOCK) * PSEUDO_COL_WIDTH);

/*******************************************************************************
 * Description:
 * Dctnry constructor
 ******************************************************************************/
Dctnry::Dctnry()
 : m_nextPtr(NOT_USED_PTR)
 , m_partition(0)
 , m_segment(0)
 , m_dbRoot(1)
 , m_numBlocks(0)
 , m_lastFbo(0)
 , m_hwm(0)
 , m_newStartOffset(0)
 , m_freeSpace(0)
 , m_curOp(0)
 , m_colWidth(0)
 , m_importDataMode(IMPORT_DATA_TEXT)
{
  memset(m_dctnryHeader, 0, sizeof(m_dctnryHeader));
  memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
  m_curBlock.lbid = INVALID_LBID;
  // add all initial header sizes for an empty block
  m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;

  memcpy(m_dctnryHeader2, &m_freeSpace, HDR_UNIT_SIZE);
  memcpy(m_dctnryHeader2 + HDR_UNIT_SIZE, &m_nextPtr, NEXT_PTR_BYTES);
  memcpy(m_dctnryHeader2 + HDR_UNIT_SIZE + NEXT_PTR_BYTES, &m_offSetZero, HDR_UNIT_SIZE);
  memcpy(m_dctnryHeader2 + HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE, &m_endHeader, HDR_UNIT_SIZE);
  m_curFbo = INVALID_NUM;
  m_curLbid = INVALID_LBID;
  m_arraySize = 0;

  clear();  // files
}

/*******************************************************************************
 * Description:
 * Dctnry destructor
 ******************************************************************************/
Dctnry::~Dctnry()
{
  // clear string cache here!
  freeStringCache();
}

/*******************************************************************************
 * Description:
 * Free memory consumed by dictionary string cache
 ******************************************************************************/
void Dctnry::freeStringCache()
{
  std::set<Signature, sig_compare>::iterator it;
  for (it = m_sigArray.begin(); it != m_sigArray.end(); it++)
  {
    Signature sig = *it;
    delete[] sig.signature;
    sig.signature = 0;
  }

  m_arraySize = 0;
  m_sigArray.clear();
}

/*******************************************************************************
 * Description:
 * Create a dictionary file and initialize the header
 *
 * PARAMETERS:
 *    none
 *
 * RETURN:
 *    success    - successfully write the header to block
 *    failure    - it did not  write the header to block
 ******************************************************************************/
int Dctnry::init()
{
  // cout <<"Init called! m_dctnryOID ="  << m_dctnryOID << endl;
  m_lastFbo = 0;
  m_hwm = 0;
  m_newStartOffset = 0;
  m_freeSpace = 0;
  m_curOp = 0;
  memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
  m_curBlock.lbid = INVALID_LBID;
  m_arraySize = 0;

  return NO_ERROR;
}

/*******************************************************************************
 * Description:
 * Create a dictionary file and initialize the header, or can be used to
 * just add an extent to an already open dictionary store file.
 *
 * PARAMETERS:
 *    input
 *        dctnryOID - dictionary OID
 *        colWidth  - dictionary string width (not the token width)
 *        dbRoot    - DBRoot where file is located
 *        partition - partition number associated with the file
 *        segment   - segment number associated with the file
 *        startLbid - (out) starting LBID of the newly allocated extent
 *        flag      - "true" indicates we are adding the first block and the
 *                    file needs to be created with an abbreviated extent.
 *                    "false" indicates we just want to add an extent to
 *                    an existing file, and the file has already been opened.
 *
 * RETURN:
 *    success    - successfully created file and/or extent
 *    failure    - failed to create file and/or extent
 ******************************************************************************/
int Dctnry::createDctnry(const OID& dctnryOID, int colWidth, const uint16_t dbRoot, const uint32_t partition,
                         const uint16_t segment, LBID_t& startLbid, bool flag)
{
  int allocSize = 0;
  char fileName[FILE_NAME_SIZE];
  int rc;
  std::map<FID, FID> oids;

#ifdef PROFILE
  Stats::startParseEvent(WE_STATS_ALLOC_DCT_EXTENT);
#endif

  if (flag)
  {
    // Allocate extent before file creation.
    // If we got an error while allocating dictionary store extent,
    // we do not need to create/close the file, because it was not created
    // yet. This logic is the same as column segment file creation - at
    // first we allocate an extent, then we create a segment file.
    rc = BRMWrapper::getInstance()->allocateDictStoreExtent((OID)dctnryOID, dbRoot, partition, segment,
                                                            startLbid, allocSize);

    if (rc != NO_ERROR)
    {
      return rc;
    }

    m_dctnryOID = dctnryOID;
    m_partition = partition;
    m_segment = segment;
    m_dbRoot = dbRoot;

    RETURN_ON_ERROR((rc = oid2FileName(m_dctnryOID, fileName, true, m_dbRoot, m_partition, m_segment)));
    m_segFileName = fileName;

    // if obsolete file exists, "w+b" will truncate and write over
    m_dFile = createDctnryFile(fileName, colWidth, "w+b", DEFAULT_BUFSIZ, startLbid);

    {
      // We presume the path will contain /
      std::string filePath(fileName);
      if (chownDataPath(filePath))
      {
        return ERR_FILE_CHOWN;
      }
    }
  }
  else
  {
    rc = BRMWrapper::getInstance()->allocateDictStoreExtent((OID)m_dctnryOID, m_dbRoot, m_partition,
                                                            m_segment, startLbid, allocSize);

    if (rc != NO_ERROR)
    {
      return rc;
    }

    RETURN_ON_ERROR(setFileOffset(m_dFile, 0, SEEK_END));
  }

  // We allocate a full extent from BRM, but only write an abbreviated 256K
  // rows to disk for 1st extent in each store file, to conserve disk usage.
  int totalSize = allocSize;

  if (flag)
  {
    totalSize = NUM_BLOCKS_PER_INITIAL_EXTENT;
  }

  if (!isDiskSpaceAvail(Config::getDBRootByNum(m_dbRoot), totalSize))
  {
    if (flag)
    {
      closeDctnryFile(false, oids);
    }

    return ERR_FILE_DISK_SPACE;
  }

#ifdef PROFILE
  Stats::stopParseEvent(WE_STATS_ALLOC_DCT_EXTENT);
#endif

  if (m_dFile != NULL)
  {
    // MCOL-498 CS optimizes abbreviated extent
    // creation.
    rc = FileOp::initDctnryExtent(m_dFile, m_dbRoot, totalSize, m_dctnryHeader2, m_totalHdrBytes, false,
                                  true,  // explicitly optimize
                                  startLbid);
    if (rc != NO_ERROR)
    {
      if (flag)
      {
        closeDctnryFile(false, oids);
      }

      return rc;
    }
  }
  else
    return ERR_FILE_CREATE;

  if (flag)
  {
    closeDctnryFile(true, oids);
    m_numBlocks = totalSize;
    m_hwm = 0;
    rc = BRMWrapper::getInstance()->setLocalHWM(m_dctnryOID, m_partition, m_segment, m_hwm);
  }
  else
  {
    m_numBlocks = m_numBlocks + totalSize;
  }

  return rc;
}

/*******************************************************************************
 * Description:
 * This function should be called to expand an abbreviated dictionary extent
 * into a full extent on disk.
 *
 * PARAMETERS:
 *    none
 *
 * RETURN:
 *    success    - successfully expanded extent
 *    failure    - failed to expand extent
 ******************************************************************************/
int Dctnry::expandDctnryExtent()
{
  RETURN_ON_NULL(m_dFile, ERR_FILE_SEEK);

  off64_t oldOffset = m_dFile->tell();

  RETURN_ON_ERROR(setFileOffset(m_dFile, 0, SEEK_END));

  // Based on extent size, see how many blocks to add to fill the extent
  int blksToAdd =
      (((int)BRMWrapper::getInstance()->getExtentRows() - INITIAL_EXTENT_ROWS_TO_DISK) / BYTE_PER_BLOCK) *
      PSEUDO_COL_WIDTH;

  if (!isDiskSpaceAvail(Config::getDBRootByNum(m_dbRoot), blksToAdd))
  {
    return ERR_FILE_DISK_SPACE;
  }

  int rc = FileOp::initDctnryExtent(m_dFile, m_dbRoot, blksToAdd, m_dctnryHeader2, m_totalHdrBytes, true,
                                    true);  // explicitly optimize

  if (rc != NO_ERROR)
    return rc;

  // Restore offset back to where we were before expanding the extent
  RETURN_ON_ERROR(setFileOffset(m_dFile, oldOffset, SEEK_SET));

  // Update block count to reflect disk space added by expanding the extent.
  m_numBlocks = m_numBlocks + blksToAdd;

  return rc;
}

/*******************************************************************************
 * DESCRIPTION:
 *    Close dictionary files
 *
 * PARAMETERS:
 *    none
 *
 * RETURN:
 *    none
 ******************************************************************************/
int Dctnry::closeDctnry(bool realClose)
{
  if (!m_dFile)
    return NO_ERROR;

  int rc;
  CommBlock cb;
  cb.file.oid = m_dctnryOID;
  cb.file.pFile = m_dFile;
  std::map<FID, FID> oids;

  if (m_curBlock.state == BLK_WRITE)
  {
    rc = writeDBFile(cb, &m_curBlock, m_curBlock.lbid);

    if (rc != NO_ERROR)
    {
      closeDctnryFile(false, oids);
      return rc;
    }

    memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
    // m_curBlock.state== BLK_INIT;
  }

  //@Bug 5572. always close file for uncompressed file.
  if (FileOp::compressionType() == 0)
    realClose = true;

  if (realClose)
  {
    //@Bug 5689. Need pass oid to write to the right file.
    oids[m_dctnryOID] = m_dctnryOID;
    // dmc-error handling (should detect/report error in closing file)
    closeDctnryFile(true, oids);
  }

  m_hwm = (HWM)m_lastFbo;
  idbassert(utils::is_nonnegative(m_dctnryOID));

  if (idbdatafile::IDBPolicy::useHdfs() && realClose)
  {
    BRM::FileInfo aFile;
    std::vector<BRM::OID_t> oidsToFlush;
    oidsToFlush.push_back(m_dctnryOID);
    aFile.oid = m_dctnryOID;
    aFile.partitionNum = m_partition;
    aFile.segmentNum = m_segment;
    aFile.dbRoot = m_dbRoot;
    aFile.compType = FileOp::compressionType();
    std::vector<BRM::FileInfo> aFileInfo;
    aFileInfo.push_back(aFile);
    cacheutils::purgePrimProcFdCache(aFileInfo, Config::getLocalModuleID());
    cacheutils::flushOIDsFromCache(oidsToFlush);
  }

  rc = BRMWrapper::getInstance()->setLocalHWM(m_dctnryOID, m_partition, m_segment, m_hwm);

  if (rc != NO_ERROR)
    return rc;

  // cout <<"Init called! m_dctnryOID ="  << m_dctnryOID << endl;
  freeStringCache();

  return NO_ERROR;
}

/*******************************************************************************
 * DESCRIPTION:
 *    Close dictionary file without flushing block buffer or updating
 *    BRM with HWM.
 *
 * PARAMETERS:
 *    none
 *
 * RETURN:
 *    none
 ******************************************************************************/
int Dctnry::closeDctnryOnly()
{
  if (!m_dFile)
    return NO_ERROR;

  // dmc-error handling (should detect/report error in closing file)
  std::map<FID, FID> oids;
  closeDctnryFile(false, oids);

  freeStringCache();

  return NO_ERROR;
}

/*******************************************************************************
 * DESCRIPTION:
 *    drop/delete dictionary file
 *
 * PARAMETERS:
 *    dctnryOID -- file number to drop
 *
 * RETURN:
 *    none
 ******************************************************************************/
int Dctnry::dropDctnry(const OID& dctnryOID)
{
  m_dctnryOID = dctnryOID;

  if (m_dFile)
  {
    RETURN_ON_ERROR(closeDctnry());
  }

  return deleteFile(dctnryOID);
}

/*******************************************************************************
 * DESCRIPTION:
 *    open dictionary file
 *
 * PARAMETERS:
 *    dctnryOID-- for open dictionary file
 *    dbRoot   -- DBRoot for dictionary store segment file
 *    partition-- partition for dictionary store segment file
 *    segment  -- segment for dictionary store segment file
 *    useTmpSuffix - for Bulk HDFS usage: use or not use *.tmp file suffix
 *
 * RETURN:
 *    successful- NO_ERROR
 *    Fail      - Error Code
 ******************************************************************************/
// @bug 5572 - HDFS usage: add *.tmp file backup flag
int Dctnry::openDctnry(const OID& dctnryOID, const uint16_t dbRoot, const uint32_t partition,
                       const uint16_t segment, const bool useTmpSuffix)
{
#ifdef PROFILE
  Stats::startParseEvent(WE_STATS_OPEN_DCT_FILE);
#endif
  int rc = NO_ERROR;
  m_dctnryOID = dctnryOID;
  m_dbRoot = dbRoot;
  m_partition = partition;
  m_segment = segment;

  m_dFile = openDctnryFile(useTmpSuffix);

  if (m_dFile == NULL)
  {
    ostringstream oss;
    oss << "oid:partition:segment " << dctnryOID << ":" << partition << ":" << segment;
    logging::Message::Args args;
    logging::Message message(1);
    args.add("Error opening dictionary file ");
    args.add(oss.str());
    args.add("");
    args.add("");
    message.format(args);
    logging::LoggingID lid(21);
    logging::MessageLog ml(lid);

    ml.logErrorMessage(message);
    return ERR_FILE_OPEN;
  }

  m_numBlocks = numOfBlocksInFile();
  std::map<FID, FID> oids;

  // Initialize other misc member variables
  init();

  int extState;
  rc = BRMWrapper::getInstance()->getLocalHWM(dctnryOID, m_partition, m_segment, m_hwm, extState);

  if (rc != NO_ERROR)
  {
    closeDctnryFile(false, oids);
    return rc;
  }

  m_lastFbo = (int)m_hwm;

  memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
  m_curFbo = m_lastFbo;
  rc = BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID, m_partition, m_segment, m_curFbo, m_curLbid);

  if (rc != NO_ERROR)
  {
    closeDctnryFile(false, oids);
    return rc;
  }

  CommBlock cb;
  cb.file.oid = m_dctnryOID;
  cb.file.pFile = m_dFile;
#ifdef PROFILE
  // We omit the call to readDBFile from OPEN_DCT_FILE stats, because com-
  // pressed files have separate stats that readDBFile() will capture thru
  // ChunkManager::fetchChunkFromFile().
  Stats::stopParseEvent(WE_STATS_OPEN_DCT_FILE);
#endif
  rc = readDBFile(cb, m_curBlock.data, m_curLbid);
#ifdef PROFILE
  Stats::startParseEvent(WE_STATS_OPEN_DCT_FILE);
#endif

  if (rc != NO_ERROR)
  {
    closeDctnryFile(false, oids);
    return rc;
  }

  //@Bug 5567  Don't seek for compressed file.
  if (m_compressionType == 0)
  {
    // Position file to the start of the current block;
    // Determine file byte offset based on the current block offset (m_curFbo)
    long long byteOffset = ((long long)m_curFbo) * (long)BYTE_PER_BLOCK;
    rc = setFileOffset(m_dFile, byteOffset);

    if (rc != NO_ERROR)
    {
      closeDctnryFile(false, oids);
      return rc;
    }
  }

  m_curBlock.lbid = m_curLbid;
  m_curBlock.state = BLK_READ;
  int opCnt = 0;
  // Get new free space (m_freeSpace) from header too! Here!!!!!!!!!!!!!!!
  getBlockOpCount(m_curBlock, opCnt);
  m_curOp = opCnt;

  // "If" this store file contains no more than 1 block, then we preload
  // the string cache used to recognize duplicates during row insertion.
  if (m_hwm == 0)
  {
    preLoadStringCache(m_curBlock);
  }

#ifdef PROFILE
  Stats::stopParseEvent(WE_STATS_OPEN_DCT_FILE);
#endif

  return rc;
}

/*******************************************************************************
 * Description:
 * Determine if the specified signature is present in the string cache.
 *
 * PARAMETERS:
 *    input
 *       sig - signature to search for
 *
 * RETURN:
 *    true  - if signature if found
 *    false - if signature is not found
 ******************************************************************************/
bool Dctnry::getTokenFromArray(Signature& sig)
{
  std::set<Signature, sig_compare>::iterator it;
  it = m_sigArray.find(sig);
  if (it == m_sigArray.end())
  {
    return false;
  }
  else
  {
    Signature sigfound = *it;
    sig.token = sigfound.token;
    return true;
  }

  return false;
}

/*******************************************************************************
 * Description:
 * Used by bulk import to insert a signature into m_curBlock, and update
 * the m_curBlock header accordingly.
 *
 * PARAMETERS:
 *    input
 *       sig   - signature to be inserted
 *    output
 *       token - token that was assigned to the inserted signature
 *
 * RETURN:
 *    success    - successfully write the signature to the block
 *    failure    - failed to extend/create an extent for the block
 ******************************************************************************/
int Dctnry::insertDctnry2(Signature& sig)
{
  int rc = 0;
  int write_size;
  bool lbid_in_token = false;
  size_t origSigSize = sig.size;
  unsigned char* origSig = sig.signature;

  sig.token.bc = 0;

  while (sig.size > 0 || !lbid_in_token)
  {
    if (sig.size > (m_freeSpace - HDR_UNIT_SIZE))
    {
      write_size = (m_freeSpace - HDR_UNIT_SIZE);
    }
    else
    {
      write_size = sig.size;
    }

    insertDctnryHdr(m_curBlock.data, write_size);
    insertSgnture(m_curBlock.data, write_size, (unsigned char*)sig.signature);

    sig.size -= write_size;
    sig.signature += write_size;
    m_curFbo = m_lastFbo;

    if (!lbid_in_token)
    {
      sig.token.fbo = m_curLbid;
      sig.token.op = m_curOp;
      lbid_in_token = true;
    }

    if (sig.size > 0)
    {
      CommBlock cb;
      cb.file.oid = m_dctnryOID;
      cb.file.pFile = m_dFile;
      sig.token.bc++;

      RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo));
      memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
      memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes);
      m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
      m_curBlock.state = BLK_WRITE;
      m_curOp = 0;
      m_lastFbo++;
      m_curFbo = m_lastFbo;

      //...Expand current extent if it is an abbreviated initial extent
      if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT))
      {
        RETURN_ON_ERROR(expandDctnryExtent());
      }

      //...Allocate a new extent if we have reached the last block in the
      //   current extent.
      if (m_curFbo == m_numBlocks)
      {
        // last block
        // for roll back the extent to use
        // Save those empty extents in case of failure to rollback
        std::vector<ExtentInfo> dictExtentInfo;
        ExtentInfo info;
        info.oid = m_dctnryOID;
        info.partitionNum = m_partition;
        info.segmentNum = m_segment;
        info.dbRoot = m_dbRoot;
        info.hwm = m_hwm;
        info.newFile = false;
        dictExtentInfo.push_back(info);
        LBID_t startLbid;
        // Add an extent.
        rc = createDctnry(m_dctnryOID,
                          0,  // dummy column width
                          m_dbRoot, m_partition, m_segment, startLbid, false);

        if (rc != NO_ERROR)
        {
          // roll back the extent
          BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo);
          return rc;
        }
      }

      RETURN_ON_ERROR(
          BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID, m_partition, m_segment, m_curFbo, m_curLbid));
      m_curBlock.lbid = m_curLbid;
    }
  }

  sig.size = origSigSize;
  sig.signature = origSig;
  return NO_ERROR;
}

int Dctnry::insertDctnry1(Signature& curSig, bool found, char* pOut, int& outOffset, int& startPos,
                          int& totalUseSize, CommBlock& cb, bool& next, long long& truncCount,
                          const CHARSET_INFO* cs, const WriteEngine::ColType& weType)
{
  if (cs->mbmaxlen > 1)
  {
    // For TEXT columns, we truncate based on the number of bytes,
    // and not based on the number of characters, as for CHAR/VARCHAR
    // columns in the else block.
    if (weType == WriteEngine::WR_TEXT)
    {
      if (curSig.size > m_colWidth)
      {
        uint8_t truncate_point = utf8::utf8_truncate_point((const char*)curSig.signature, m_colWidth);
        curSig.size = m_colWidth - truncate_point;
        truncCount++;
      }
    }
    else
    {
      const char* start = (const char*)curSig.signature;
      const char* end = (const char*)(curSig.signature + curSig.size);
      size_t numChars = cs->numchars(start, end);
      size_t maxCharLength = m_colWidth / cs->mbmaxlen;

      if (numChars > maxCharLength)
      {
        MY_STRCOPY_STATUS status;
        cs->well_formed_char_length(start, end, maxCharLength, &status);
        curSig.size = status.m_source_end_pos - start;
        truncCount++;
      }
    }
  }
  else  // cs->mbmaxlen == 1
  {
    if (curSig.size > m_colWidth)
    {
      curSig.size = m_colWidth;
      truncCount++;
    }
  }

  //...Search for the string in our string cache
  // if it fits into one block (< 8KB)
  if (curSig.size <= MAX_SIGNATURE_SIZE)
  {
    // Stats::startParseEvent("getTokenFromArray");
    found = getTokenFromArray(curSig);

    if (found)
    {
      memcpy(pOut + outOffset, &curSig.token, 8);
      outOffset += 8;
      startPos++;
      // Stats::stopParseEvent("getTokenFromArray");
      return NO_ERROR;
    }

    // Stats::stopParseEvent("getTokenFromArray");
  }

  totalUseSize = m_totalHdrBytes + curSig.size;

  //...String not found in cache, so proceed.
  //   If room is available in current block then insert into block.
  // @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback
  if (((totalUseSize <= m_freeSpace - HDR_UNIT_SIZE) ||
       ((curSig.size > 8176) && (m_freeSpace > HDR_UNIT_SIZE))) &&
      (m_curOp < (MAX_OP_COUNT - 1)))
  {
    RETURN_ON_ERROR(insertDctnry2(curSig));  // m_freeSpace updated!
    m_curBlock.state = BLK_WRITE;
    memcpy(pOut + outOffset, &curSig.token, 8);
    outOffset += 8;
    startPos++;
    found = true;

    //...If we have reached limit for the number of strings allowed in
    //   a block, then we write the current block so that we can start
    //   another block.
    if (m_curOp >= MAX_OP_COUNT - 1)
    {
#ifdef PROFILE
      Stats::stopParseEvent(WE_STATS_PARSE_DCT);
#endif
      RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo));
      m_curBlock.state = BLK_READ;
      next = true;
    }

    //...Add string to cache, if we have not exceeded cache limit
    // Don't cache big blobs
    if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (curSig.size <= MAX_SIGNATURE_SIZE))
    {
      addToStringCache(curSig);
    }
  }
  else  //...No room for this string in current block, so we write
        //   out the current block, so we can start another block
  {
#ifdef PROFILE
    Stats::stopParseEvent(WE_STATS_PARSE_DCT);
#endif
    RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo));
    m_curBlock.state = BLK_READ;
    next = true;
    found = false;
  }  // if m_freeSpace

  //..."next" flag is used to indicate that we need to advance to the
  //   next block in the store file.
  if (next)
  {
    memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
    memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes);
    m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
    m_curBlock.state = BLK_WRITE;
    m_curOp = 0;
    next = false;
    m_lastFbo++;
    m_curFbo = m_lastFbo;

    //...Expand current extent if it is an abbreviated initial extent
    if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT))
    {
      RETURN_ON_ERROR(expandDctnryExtent());
    }

    //...Allocate a new extent if we have reached the last block in the
    //   current extent.
    if (m_curFbo == m_numBlocks)
    {
      // last block
      LBID_t startLbid;

      // Add an extent.
      RETURN_ON_ERROR(
          createDctnry(m_dctnryOID, m_colWidth, m_dbRoot, m_partition, m_segment, startLbid, false));

      if (m_logger)
      {
        std::ostringstream oss;
        oss << "Add dictionary extent OID-" << m_dctnryOID << "; DBRoot-" << m_dbRoot << "; part-"
            << m_partition << "; seg-" << m_segment << "; hwm-" << m_curFbo << "; LBID-" << startLbid
            << "; file-" << m_segFileName;
        m_logger->logMsg(oss.str(), MSGLVL_INFO2);
      }

      m_curLbid = startLbid;

      // now seek back to the curFbo, after adding an extent
      // @bug5769 For uncompressed only;
      // ChunkManager manages the file offset for the compression case
      if (m_compressionType == 0)
      {
#ifdef PROFILE
        Stats::startParseEvent(WE_STATS_PARSE_DCT_SEEK_EXTENT_BLK);
#endif
        long long byteOffset = m_curFbo;
        byteOffset *= BYTE_PER_BLOCK;
        RETURN_ON_ERROR(setFileOffset(m_dFile, byteOffset));
#ifdef PROFILE
        Stats::stopParseEvent(WE_STATS_PARSE_DCT_SEEK_EXTENT_BLK);
#endif
      }
    }
    else
    {
      // LBIDs are numbered collectively and consecutively within an
      // extent, so within an extent we can derive the LBID by simply
      // incrementing it rather than having to go back to BRM to look
      // up the LBID for each FBO.
      m_curLbid++;
    }

#ifdef PROFILE
    Stats::startParseEvent(WE_STATS_PARSE_DCT);
#endif
    m_curBlock.lbid = m_curLbid;

    //..."found" flag indicates whether the string was already found
    //   "or" added to the end of the previous block.  If false, then
    //   we need to add the string to the new block.
    if (!found)
    {
      RETURN_ON_ERROR(insertDctnry2(curSig));  // m_freeSpace updated!
      m_curBlock.state = BLK_WRITE;
      memcpy(pOut + outOffset, &curSig.token, 8);
      outOffset += 8;
      startPos++;

      //...Add string to cache, if we have not exceeded cache limit
      if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (curSig.size <= MAX_SIGNATURE_SIZE))
      {
        addToStringCache(curSig);
      }
    }
  }  // if next

  return NO_ERROR;
}

/*******************************************************************************
 * Description:
 * Used by bulk import to insert batch of parquet strings into this store file.
 * Function assumes that the file is already positioned to the current block.
 *
 * PARAMETERS:
 *    input
 *       columnData - arrow array containing input strings
 *       startRowIdx - start position for current batch parquet data
 *       totalRow - number of rows in "buf"
 *       col - column of strings to be parsed from "buf"
 *    output
 *       tokenBuf   - tokens assigned to inserted strings
 *
 * RETURN:
 *    success     - successfully write the header to block
 *    failure     - it did not write the header to block
 ******************************************************************************/
int Dctnry::insertDctnryParquet(std::shared_ptr<arrow::Array> columnData, int startRowIdx, const int totalRow,
                                const int col, char* tokenBuf, long long& truncCount, const CHARSET_INFO* cs,
                                const WriteEngine::ColType& weType)
{
#ifdef PROFILE
  Stats::startParseEvent(WE_STATS_PARSE_DCT);
#endif
  int startPos = 0;
  int totalUseSize = 0;

  int outOffset = 0;
  const char* pIn;
  char* pOut = tokenBuf;
  Signature curSig;
  bool found = false;
  bool next = false;
  CommBlock cb;
  cb.file.oid = m_dctnryOID;
  cb.file.pFile = m_dFile;
  WriteEngine::Token nullToken;

  BinaryArraySharedPtr binaryArray;
  FixedSizeBinaryArraySharedPtr fixedSizeBinaryArray;

  if (columnData->type_id() != arrow::Type::type::FIXED_SIZE_BINARY)
    binaryArray = std::static_pointer_cast<arrow::BinaryArray>(columnData);
  else
    fixedSizeBinaryArray = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(columnData);

  // check if this column data imported is NULL array or not
  bool isNonNullArray = columnData->type_id() == arrow::Type::type::NA ? false : true;

  //...Loop through all the rows for the specified column
  while (startPos < totalRow)
  {
    found = false;
    void* curSigPtr = static_cast<void*>(&curSig);
    memset(curSigPtr, 0, sizeof(curSig));

    // if this column is not null data
    if (isNonNullArray)
    {
      const uint8_t* data;

      if (binaryArray != nullptr)
      {
        data = binaryArray->GetValue(startPos + startRowIdx, &curSig.size);
      }
      else
      {
        data = fixedSizeBinaryArray->GetValue(startPos + startRowIdx);
        std::shared_ptr<arrow::DataType> tType = fixedSizeBinaryArray->type();
        curSig.size = tType->byte_width();
      }

      const char* dataPtr = reinterpret_cast<const char*>(data);

      // Strip trailing null bytes '\0' (by adjusting curSig.size) if import-
      // ing in binary mode.  If entire string is binary zeros, then we treat
      // as a NULL value.
      if (curSig.size > 0)
      {
        const char* fld = dataPtr;
        int kk = curSig.size - 1;

        for (; kk >= 0; kk--)
        {
          if (fld[kk] != '\0')
            break;
        }
        curSig.size = kk + 1;
      }

      // Read thread should validate against max size so that the entire row
      // can be rejected up front.  Once we get here in the parsing thread,
      // it is too late to reject the row.  However, as a precaution, we
      // still check against max size & set to null token if needed.
      if ((curSig.size == 0) || (curSig.size > MAX_BLOB_SIZE))
      {
        if (m_defVal.length() > 0)  // use default string if available
        {
          pIn = m_defVal.str();
          curSig.signature = (unsigned char*)pIn;
          curSig.size = m_defVal.length();
        }
        else
        {
          memcpy(pOut + outOffset, &nullToken, 8);
          outOffset += 8;
          startPos++;
          continue;
        }
      }
      else
      {
        pIn = dataPtr;
        curSig.signature = (unsigned char*)pIn;
      }
    }
    else
    {
      curSig.size = 0;

      if (m_defVal.length() > 0)  // use default string if available
      {
        pIn = m_defVal.str();
        curSig.signature = (unsigned char*)pIn;
        curSig.size = m_defVal.length();
      }
      else
      {
        memcpy(pOut + outOffset, &nullToken, 8);
        outOffset += 8;
        startPos++;
        continue;
      }
    }

    RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next,
                                  truncCount, cs, weType));
  }

#ifdef PROFILE
  Stats::stopParseEvent(WE_STATS_PARSE_DCT);
#endif
  // Done
  // If any data leftover and not written by subsequent call to
  // insertDctnry(), then it will be written by closeDctnry().

  return NO_ERROR;
}

/*******************************************************************************
 * Description:
 * Used by bulk import to insert collection of strings into this store file.
 * Function assumes that the file is already positioned to the current block.
 *
 * PARAMETERS:
 *    input
 *       buf - character buffer containing input strings
 *       pos - meta data describing data in "buf"
 *       totalRow - number of rows in "buf"
 *       col - column of strings to be parsed from "buf"
 *    output
 *       tokenBuf  - tokens assigned to inserted strings
 *
 * RETURN:
 *    success    - successfully write the header to block
 *    failure    - it did not  write the header to block
 ******************************************************************************/
int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, const int col, char* tokenBuf,
                         long long& truncCount, const CHARSET_INFO* cs, const WriteEngine::ColType& weType)
{
#ifdef PROFILE
  Stats::startParseEvent(WE_STATS_PARSE_DCT);
#endif
  int startPos = 0;
  int totalUseSize = 0;

  int outOffset = 0;
  const char* pIn;
  char* pOut = tokenBuf;
  Signature curSig;
  bool found = false;
  bool next = false;
  CommBlock cb;
  cb.file.oid = m_dctnryOID;
  cb.file.pFile = m_dFile;
  WriteEngine::Token nullToken;

  //...Loop through all the rows for the specified column
  while (startPos < totalRow)
  {
    found = false;
    void* curSigPtr = static_cast<void*>(&curSig);
    memset(curSigPtr, 0, sizeof(curSig));
    curSig.size = pos[startPos][col].offset;

    // Strip trailing null bytes '\0' (by adjusting curSig.size) if import-
    // ing in binary mode.  If entire string is binary zeros, then we treat
    // as a NULL value.
    if (m_importDataMode != IMPORT_DATA_TEXT)
    {
      if ((curSig.size > 0) && (curSig.size != COLPOSPAIR_NULL_TOKEN_OFFSET))
      {
        char* fld = (char*)buf + pos[startPos][col].start;
        int kk = curSig.size - 1;

        for (; kk >= 0; kk--)
        {
          if (fld[kk] != '\0')
            break;
        }

        curSig.size = kk + 1;
      }
    }

    // Read thread should validate against max size so that the entire row
    // can be rejected up front.  Once we get here in the parsing thread,
    // it is too late to reject the row.  However, as a precaution, we
    // still check against max size & set to null token if needed.
    if ((curSig.size == 0) || (curSig.size == COLPOSPAIR_NULL_TOKEN_OFFSET) || (curSig.size > MAX_BLOB_SIZE))
    {
      if (m_defVal.length() > 0)  // use default string if available
      {
        pIn = m_defVal.str();
        curSig.signature = (unsigned char*)pIn;
        curSig.size = m_defVal.length();
      }
      else
      {
        memcpy(pOut + outOffset, &nullToken, 8);
        outOffset += 8;
        startPos++;
        continue;
      }
    }
    else
    {
      pIn = (char*)buf + pos[startPos][col].start;
      curSig.signature = (unsigned char*)pIn;
    }

    RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next,
                                  truncCount, cs, weType));
  }  // end while

#ifdef PROFILE
  Stats::stopParseEvent(WE_STATS_PARSE_DCT);
#endif
  // Done
  // If any data leftover and not written by subsequent call to
  // insertDctnry(), then it will be written by closeDctnry().

  return NO_ERROR;
}

/*******************************************************************************
 * DESCRIPTION:
 * Used by DML to insert a single string into this store file.
 * (1) Insert a signature value into the block
 * (2) The header information inserted at front
 * (3) The signature inserted from back
 * (4) Total minimum header size-- free space 2bytes, next pointer 8 bytes
 *     zero offset 2 bytes, end of header 2 bytes, total 14 bytes
 *     plus 2 bytes for new values' starting offset value storage
 *     total 14 bytes
 * (5) Values size <=8176 =(8192-16) will not be split into two blocks
 * (6) For smaller value <=8176, it has to fit into one block or
 *     unsuccessfully to insert
 * (7) For large value > 8176,
 *      smaller space first then take up a whole block
 *      or a whole block first then some left over space in another
 *     block
 * (8) limit to 8000 byte for this release size
 *
 * PARAMETERS:
 *    input dFile
 *        -- File handle
 *    Input  sgnature_size
 *        -- how many bytes the signature occupies
 *    Input  sgnature_value
 *        -- the value of the signature
 *    output token
 *        -- token structure carrying the assigned fbo and op
 *
 * RETURN:
 *    success    - successfully insert the signature
 *    failure    - it did not   insert the signature
 ******************************************************************************/
int Dctnry::insertDctnry(const int& sgnature_size, const unsigned char* sgnature_value, Token& token)
{
  int rc = 0;
  int i;
  unsigned char* value = NULL;
  int size;
  int write_size;
  bool lbid_in_token = false;

  // Round down for safety. In theory we can take 262143 * 8176 bytes
  if (sgnature_size > MAX_BLOB_SIZE)
  {
    return ERR_DICT_SIZE_GT_2G;
  }

  CommBlock cb;
  cb.file.oid = m_dctnryOID;
  cb.file.pFile = m_dFile;

  size = sgnature_size;
  value = (unsigned char*)sgnature_value;
  token.bc = 0;

  for (i = m_lastFbo; i < m_numBlocks; i++)
  {
    // @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback
    if (((m_freeSpace - HDR_UNIT_SIZE >= size) || ((size > 8176) && (m_freeSpace > HDR_UNIT_SIZE))) &&
        (m_curOp < (MAX_OP_COUNT - 1)))
    {
      // found the perfect block; signature size fit in this block
      if (size > (m_freeSpace - HDR_UNIT_SIZE))
      {
        write_size = (m_freeSpace - HDR_UNIT_SIZE);
      }
      else
      {
        write_size = size;
      }

      insertDctnryHdr(m_curBlock.data, write_size);
      insertSgnture(m_curBlock.data, write_size, value);
      size -= write_size;
      value += write_size;
      m_curBlock.state = BLK_WRITE;

      // We only want the start LBID for a multi-block dict in the token
      if (!lbid_in_token)
      {
        token.fbo = m_curLbid;
        token.op = m_curOp;
        lbid_in_token = true;
      }

      if (size > 0)
        token.bc++;

      m_lastFbo = i;
      m_curFbo = m_lastFbo;

      if ((m_curOp < (MAX_OP_COUNT - 1)) && (size <= 0))
        return NO_ERROR;
    }  // end Found

    //@bug 3832. check error code
    RETURN_ON_ERROR(writeDBFile(cb, &m_curBlock, m_curLbid));
    memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
    memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes);
    m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
    m_curBlock.state = BLK_WRITE;
    m_curOp = 0;
    m_lastFbo++;
    m_curFbo = m_lastFbo;

    //...Expand current extent if it is an abbreviated initial extent
    if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT))
    {
      RETURN_ON_ERROR(expandDctnryExtent());
    }

    //...Allocate a new extent if we have reached the last block in the
    //   current extent.
    if (m_curFbo == m_numBlocks)
    {
      // last block
      // for roll back the extent to use
      // Save those empty extents in case of failure to rollback
      std::vector<ExtentInfo> dictExtentInfo;
      ExtentInfo info;
      info.oid = m_dctnryOID;
      info.partitionNum = m_partition;
      info.segmentNum = m_segment;
      info.dbRoot = m_dbRoot;
      info.hwm = m_hwm;
      info.newFile = false;
      dictExtentInfo.push_back(info);
      LBID_t startLbid;
      // Add an extent.
      rc = createDctnry(m_dctnryOID,
                        0,  // dummy column width
                        m_dbRoot, m_partition, m_segment, startLbid, false);

      if (rc != NO_ERROR)
      {
        // roll back the extent
        BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo);
        return rc;
      }
    }

    RETURN_ON_ERROR(
        BRMWrapper::getInstance()->getBrmInfo(m_dctnryOID, m_partition, m_segment, m_curFbo, m_curLbid));
    m_curBlock.lbid = m_curLbid;
  }  // end for loop for all of the blocks

  return ERR_DICT_NO_SPACE_INSERT;
}

/*******************************************************************************
 * Description
 * Update the block header (and data members like m_freeSpace,
 * m_newStartOffset, etc), to reflect the insertion of string of size "size"
 *
 * PARAMETERS:
 *    input
 *        blockBuf
 *        --the block buffer
 *    input
 *        size
 *        --Size of the signature value
 *
 * RETURN:
 *    none
 ******************************************************************************/
void Dctnry::insertDctnryHdr(unsigned char* blockBuf, const int& size)
{
  int endHdrLoc = START_HDR1 + (m_curOp + 1) * HDR_UNIT_SIZE;
  int nextOffsetLoc = START_HDR1 + m_curOp * HDR_UNIT_SIZE;
  int lastOffsetLoc = START_HDR1 + (m_curOp - 1) * HDR_UNIT_SIZE;

  m_freeSpace -= (size + HDR_UNIT_SIZE);
  memcpy(&blockBuf[endHdrLoc], &m_endHeader, HDR_UNIT_SIZE);
  uint16_t lastOffset = *(uint16_t*)&blockBuf[lastOffsetLoc];
  uint16_t nextOffset = lastOffset - size;

  memcpy(&blockBuf[0], &m_freeSpace, HDR_UNIT_SIZE);
  memcpy(&blockBuf[nextOffsetLoc], &nextOffset, HDR_UNIT_SIZE);
  m_newStartOffset = nextOffset;
  m_curOp++;
}

/*******************************************************************************
 * DESCRIPTION:
 * Insert the specified string into the block buffer.
 *
 * PARAMETERS:
 *    Input blockBuf
 *        --block buffer
 *    Input size
 *        -- size of the signature value
 *    Input value
 *        -- value of the signature
 *
 * RETURN:
 *    none
 ******************************************************************************/
void Dctnry::insertSgnture(unsigned char* blockBuf, const int& size, unsigned char* value)
{
  // m_newStartLoc is calculated from the header insertion code
  memcpy(&blockBuf[m_newStartOffset], value, size);
}

/*******************************************************************************
 * Description:
 * get the op count for a block
 * input
 *      DataBlock& fileBlock -- the file block
 * output
 *      op_count - total op count
 ******************************************************************************/
void Dctnry::getBlockOpCount(const DataBlock& fileBlock, int& op_count)
{
  ByteStream bs;
  ByteStream::byte inbuf[BYTE_PER_BLOCK];
  memcpy(inbuf, fileBlock.data, BYTE_PER_BLOCK);
  bs.load(inbuf, BYTE_PER_BLOCK);

  ByteStream::doublebyte offset;
  ByteStream::doublebyte dbyte;
  bs >> m_freeSpace;
  bs >> dbyte;
  bs >> dbyte;
  bs >> dbyte;
  bs >> dbyte;
  bs >> dbyte;
  idbassert(dbyte == BYTE_PER_BLOCK);
  bs >> offset;

  while (offset < 0xffff)
  {
    op_count++;
    bs >> offset;
  }
}

/*******************************************************************************
 * Description:
 * Loads the string cache from the specified DataBlock, which should be
 * the first block in the applicable dictionary store file.
 * input
 *      DataBlock& fileBlock -- the file block
 ******************************************************************************/
void Dctnry::preLoadStringCache(const DataBlock& fileBlock)
{
  int hdrOffsetBeg = HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE;
  int hdrOffsetEnd = HDR_UNIT_SIZE + NEXT_PTR_BYTES;
  uint16_t offBeg = 0;
  uint16_t offEnd = 0;
  memcpy(&offBeg, &fileBlock.data[hdrOffsetBeg], HDR_UNIT_SIZE);
  memcpy(&offEnd, &fileBlock.data[hdrOffsetEnd], HDR_UNIT_SIZE);

  int op = 1;  // ordinal position of the string within the block
  Signature aSig;
  void* aSigPtr = static_cast<void*>(&aSig);
  memset(aSigPtr, 0, sizeof(aSig));

  while ((offBeg != DCTNRY_END_HEADER) && (op <= MAX_STRING_CACHE_SIZE))
  {
    unsigned int len = offEnd - offBeg;
    aSig.size = len;
    aSig.signature = new unsigned char[len];
    memcpy(aSig.signature, &fileBlock.data[offBeg], len);
    aSig.token.op = op;
    aSig.token.fbo = m_curLbid;
    m_sigArray.insert(aSig);

    offEnd = offBeg;
    hdrOffsetBeg += HDR_UNIT_SIZE;
    memcpy(&offBeg, &fileBlock.data[hdrOffsetBeg], HDR_UNIT_SIZE);
    op++;
  }

  m_arraySize = op - 1;

  // std::cout << "Preloading strings..." << std::endl;
  // char strSig[1000];
  // uint64_t tokenVal;
  // for (int i=0; i<m_arraySize; i++)
  //{
  //  memcpy(strSig, m_sigArray[i].signature, m_sigArray[i].size );
  //  memcpy(&tokenVal, &m_sigArray[i].token, sizeof(uint64_t));
  //  strSig[m_sigArray[i].size] = '\0';
  //  std::cout << "op-"      << m_sigArray[i].token.op  <<
  //               "; fbo-"   << m_sigArray[i].token.fbo <<
  //               "; sig-"   << strSig   <<
  //               "; token-" << tokenVal << std::endl;
  //}
}

/*******************************************************************************
 * Description:
 * Add the specified signature (string) to the string cache.
 * input
 *      newSig -- Signature string to be added to the string cache.
 ******************************************************************************/
void Dctnry::addToStringCache(const Signature& newSig)
{
  // We better add constructors that sets everything to 0;
  Signature asig;
  void* aSigPtr = static_cast<void*>(&asig);
  memset(aSigPtr, 0, sizeof(asig));
  asig.signature = new unsigned char[newSig.size];
  memcpy(asig.signature, newSig.signature, newSig.size);
  asig.size = newSig.size;
  asig.token = newSig.token;
  m_sigArray.insert(asig);
  m_arraySize++;
}

/*******************************************************************************
 * Description:
 * get the location of the end of header
 * input
 *      dFile - file handle
 *      lbid  - block of interest
 * output
 *      endOp - ordinal position of the end of header for "lbid"
 *
 * return value
 *        Success -- found and deleted
 *        Fail    -- ERR_DICT_INVALID_DELETE
 ******************************************************************************/
int Dctnry::getEndOp(IDBDataFile* dFile, int lbid, int& endOp)
{
  DataBlock fileBlock;
  Offset newOffset;
  int rc;
  CommBlock cb;
  cb.file.oid = m_dctnryOID;
  cb.file.pFile = dFile;
  memset(fileBlock.data, 0, sizeof(fileBlock.data));
  m_dFile = dFile;
  rc = readSubBlockEntry(cb, &fileBlock, lbid, 0, 0,
                         HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE + HDR_UNIT_SIZE, &m_dctnryHeader);

  memcpy(&m_freeSpace, &fileBlock.data[0], HDR_UNIT_SIZE);
  memcpy(&m_nextPtr, &fileBlock.data[HDR_UNIT_SIZE], NEXT_PTR_BYTES);

  newOffset.hdrLoc = HDR_UNIT_SIZE + NEXT_PTR_BYTES + HDR_UNIT_SIZE;
  memcpy(&newOffset.offset, &fileBlock.data[newOffset.hdrLoc], HDR_UNIT_SIZE);
  endOp = 1;  // should be zero counting the end of header then

  while (newOffset.offset != DCTNRY_END_HEADER)
  {
    newOffset.hdrLoc += HDR_UNIT_SIZE;
    memcpy(&newOffset.offset, &fileBlock.data[newOffset.hdrLoc], HDR_UNIT_SIZE);
    endOp++;
  }

  return rc;
}

/*******************************************************************************
 * Add a signature value to the dictionary store.
 * Function first checks to see if the signature is already
 * in our string cache, and returns the corresponding token
 * if it is found in the cache.
 ******************************************************************************/
int Dctnry::updateDctnry(unsigned char* sigValue, int& sigSize, Token& token)
{
  int rc = NO_ERROR;
  Signature sig;
  sig.signature = sigValue;
  sig.size = sigSize;

  // Look for string in cache
  // As long as the string <= 8000 bytes
  if (sigSize <= MAX_SIGNATURE_SIZE)
  {
    bool found = false;
    found = getTokenFromArray(sig);

    if (found)
    {
      token = sig.token;
      return NO_ERROR;
    }
  }

  // Insert into Dictionary
  rc = insertDctnry(sigSize, sigValue, token);

  // Add the new signature and token into cache
  // As long as the string is <= 8000 bytes
  if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (sigSize <= MAX_SIGNATURE_SIZE))
  {
    Signature sig;
    sig.size = sigSize;
    sig.signature = new unsigned char[sigSize];
    memcpy(sig.signature, sigValue, sigSize);
    sig.token = token;
    m_sigArray.insert(sig);
    m_arraySize++;
  }

  return rc;
}

/*******************************************************************************
 * open dictionary file
 ******************************************************************************/
IDBDataFile* Dctnry::createDctnryFile(const char* name, int, const char* mode, int ioBuffSize, LBID_t lbid)
{
  (void)lbid;
  return openFile(name, mode, ioBuffSize, false);
}

/*******************************************************************************
 * open dictionary file
 ******************************************************************************/
// @bug 5572 - HDFS usage: add *.tmp file backup flag
IDBDataFile* Dctnry::openDctnryFile(bool useTmpSuffix)
{
  return openFile(m_dctnryOID, m_dbRoot, m_partition, m_segment, m_segFileName, "r+b", DEFAULT_COLSIZ,
                  useTmpSuffix);
}

/*******************************************************************************
 * close dictionary file
 ******************************************************************************/
void Dctnry::closeDctnryFile(bool doFlush, std::map<FID, FID>& oids)
{
  closeFile(m_dFile);
  m_dFile = NULL;
}

int Dctnry::numOfBlocksInFile()
{
  long long fileSizeBytes = 0;
  getFileSize(m_dFile, fileSizeBytes);  // dmc-error handling (ignoring rc)
  return fileSizeBytes / BYTE_PER_BLOCK;
}

void Dctnry::copyDctnryHeader(void* buf)
{
  memcpy(buf, m_dctnryHeader2, m_totalHdrBytes);
}

}  // namespace WriteEngine
